Skip to content

feat: preserve blob data through Spark shuffle during JOIN + INSERT INTO#355

Merged
hamersaw merged 4 commits into
lance-format:mainfrom
beinan:feat/blob-join-preservation
May 21, 2026
Merged

feat: preserve blob data through Spark shuffle during JOIN + INSERT INTO#355
hamersaw merged 4 commits into
lance-format:mainfrom
beinan:feat/blob-join-preservation

Conversation

@beinan

@beinan beinan commented Mar 27, 2026

Copy link
Copy Markdown
Contributor

Summary

  • When blob columns flow through Spark's shuffle (e.g., INSERT INTO target SELECT ... FROM source_a JOIN source_b), the actual blob data was previously lost because getBinary() returned empty byte arrays (new byte[0]). The target table ended up with zero-length blobs.
  • This PR introduces a blob reference mechanism that preserves blob data through Spark's shuffle without materializing the full blob bytes (which could be MBs/GBs per row and would kill shuffle performance).

How it works

Read side — compact blob references flow through shuffle:

  • Instead of returning byte[0], blob columns now serialize compact ~100-byte BlobReference descriptors containing the source dataset URI, column name, and row address
  • The scanner automatically requests _rowaddr when blob columns are present, and strips it from the output after extracting the row addresses
  • These references are small enough to flow through Spark shuffle without overhead

Write side — blob references are resolved to actual bytes:

  • A new BlobResolvingLargeBinaryWriter detects BlobReference magic headers in incoming binary values
  • It opens the source dataset (with caching), calls Dataset.takeBlobs() to fetch the actual blob bytes
  • The real blob data is then written to the target table through the normal blob write path

Key files

File Status Description
BlobReference.java NEW Compact serializable blob reference format (LBRF magic + dataset URI + column name + row address)
BlobReferenceResolver.java NEW Write-side resolver: opens source dataset → takeBlobs() → returns actual bytes
BaseBlobJoinTest.java NEW Tests for blob preservation through simple INSERT INTO SELECT, JOIN + INSERT, and mixed blob/non-blob JOINs
BlobJoinTest.java (3.4 + 3.5) NEW Concrete test implementations
BlobStructAccessor.java Modified Added blob reference context (dataset URI, column name, row addresses) and getBlobReference()
LanceArrowColumnVector.java Modified getBinary() returns blob reference instead of byte[0]
LanceFragmentScanner.java Modified Detects blob columns, requests _rowaddr, exposes dataset URI and blob column names
LanceFragmentColumnarBatchScanner.java Modified Extracts row addresses from _rowaddr, sets blob context on BlobStructAccessor, strips implicit _rowaddr
LanceArrowWriter.scala Modified Added BlobResolvingLargeBinaryWriter for blob-encoded columns
BaseBlobCreateTableTest.java Modified Updated assertions for new blob reference behavior

Test plan

  • BlobJoinTest.testBlobPreservedDuringInsertIntoSelect — Simple INSERT INTO target SELECT FROM source preserves blob data
  • BlobJoinTest.testBlobPreservedDuringJoinAndInsertJOIN of two blob tables preserves both blob columns in target
  • BlobJoinTest.testNonBlobColumnsPreservedDuringJoinWithBlobs — Non-blob columns survive JOIN+INSERT alongside blob columns
  • All existing BlobCreateTableTest tests pass (9/9)

🤖 Generated with Claude Code

@github-actions github-actions Bot added the enhancement New feature or request label Mar 27, 2026
jackye1995 pushed a commit to lance-format/lance that referenced this pull request Apr 7, 2026
## Summary

Add support for writing blob v2 columns with external URI references
that are outside registered base paths. This enables use cases like
INSERT INTO SELECT across Lance tables where the target table stores
external blob references pointing to the source table's blob files
instead of copying the actual blob bytes.

## Changes

- **WriteParams.java**: Add `allowExternalBlobOutsideBases`
Optional<Boolean> field, getter, and builder method
- **Fragment.java**: Pass the new field through `createWithFfiArray` and
`createWithFfiStream` native methods
- **fragment.rs (JNI)**: Thread the new `Optional<Boolean>` parameter
through all fragment creation functions to `extract_write_params`
- **utils.rs (JNI)**: Parse the new parameter and set
`allow_external_blob_outside_bases` on Rust `WriteParams`
- **blocking_dataset.rs (JNI)**: Pass `JObject::null()` for the new
param in `Dataset.write()` path (not needed there)

## Context

This is a prerequisite for lance-spark blob JOIN support
(lance-format/lance-spark#355). When blob data flows through Spark's
shuffle during JOIN + INSERT INTO, the target table needs to write
external blob references pointing to the source table's physical blob
files. The Rust `BlobPreprocessor` already supports this via
`allow_external_blob_outside_bases`, but the Java SDK had no way to set
it.

Ref: #6321, #6322

## Test plan

- [x] Rust JNI code compiles cleanly (no errors in changed files)
- [ ] Java unit tests (CI)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@beinan beinan force-pushed the feat/blob-join-preservation branch from d3a606c to 9fabe62 Compare May 13, 2026 20:16

@hamersaw hamersaw left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Should be a pretty significant help in reducing memory utilization. A few comments to iron things out, but I think I need to dive through the regular read paths to make sure this doesn't negatively affect performance there as well.

Comment thread lance-spark-base_2.12/src/main/scala/org/lance/spark/arrow/LanceArrowWriter.scala Outdated
@hamersaw

Copy link
Copy Markdown
Collaborator

Diving through this a bit more. I'm not sure there's a reason to have a LargeBinaryWriter (existing) and a BlobResolvingLargeBinaryWriter (new). Right now, if we're reading from a blob column and writing to a regular binary we're going to dump a ton of BlobReferences rather than the actual bytes because it uses the LargeBinaryWriter which doesn't resolve the BlobReference. I think we should collapse these into a single writer to mitigate future issues.

@beinan

beinan commented May 15, 2026

Copy link
Copy Markdown
Contributor Author

Good point — collapsed BlobResolvingLargeBinaryWriter into LargeBinaryWriter so there is now a single writer that always resolves blob references via resolveIfNeeded(). This means blob references are correctly resolved regardless of whether the target column is blob-encoded or regular binary, preventing silent data corruption.

@beinan

beinan commented May 17, 2026

Copy link
Copy Markdown
Contributor Author

@hamersaw All feedback has been addressed and CI is fully green. Ready for another look when you get a chance!

@hamersaw hamersaw left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the iteration here, I think we're getting close!

Comment thread lance-spark-base_2.12/src/main/scala/org/lance/spark/arrow/LanceArrowWriter.scala Outdated
Comment thread lance-spark-base_2.12/src/main/scala/org/lance/spark/arrow/LanceArrowWriter.scala Outdated
@beinan

beinan commented May 19, 2026

Copy link
Copy Markdown
Contributor Author

@hamersaw Addressed all four comments from the second review — CI is green again. Would appreciate another look when you have a moment!

@hamersaw hamersaw left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Lets resolve merge conflicts and then I'll get it in.

When blob columns flow through Spark's shuffle (e.g., INSERT INTO
target SELECT ... FROM source_a JOIN source_b), the actual blob data
was previously lost. This PR introduces a blob reference mechanism
that preserves blob data through shuffle without materializing the
full blob bytes.

Read side: blob columns serialize compact ~100-byte BlobReference
descriptors (LANCEREF magic + dataset URI + column name + row address)
instead of empty bytes. The scanner requests _rowaddr when blob columns
are present and strips it from the output.

Write side: LargeBinaryWriter detects BlobReference headers, buffers
them during setValue(), then batch-resolves all references in finish()
via a single takeBlobs() call per (dataset, column) group. Dataset
instances are cached across batches for the task lifetime.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@beinan beinan force-pushed the feat/blob-join-preservation branch from 75d49e6 to 2cd8627 Compare May 20, 2026 03:36
Addresses review feedback on the blob JOIN/INSERT preservation path and
makes vended-credential auto-refresh work for blob sources.

Correctness:
- LargeBinaryWriter now buffers all per-row values and emits the vector in
  a single ascending pass at finish(), instead of back-filling resolved
  blobs out of order (which corrupts a LargeVarBinaryVector's offset
  buffer). resolveBatch() returns an index->bytes map rather than writing
  into the vector.

Resource lifecycle:
- The BlobReferenceResolver is now created once per write task, shared
  across batches/fragments, and closed at LanceDataWriter teardown —
  fixing a leak of native source datasets (one per blob column per batch).

Credentials (the main design change):
- New LanceBlobSourceContextRule optimizer rule collects each blob source
  table's BlobSourceContext (read options + namespace config) on the
  driver and stashes them, keyed by source URI, in the write command's
  options. LanceDataset.newWriteBuilder decodes them and threads them to
  the per-task resolver, which reopens sources via
  Utils.openDatasetBuilder().runtimeNamespace(...) so vended credentials
  keep auto-refreshing — mirroring compaction/index. No global registry,
  no per-row shuffle bloat. Registered in LanceSparkSessionExtensions.
- BlobReferenceResolver no longer opens datasets directly; falls back to
  open-by-URI when no context is present (local sources / extension off).

Performance:
- BlobStructAccessor precomputes the constant reference prefix once per
  batch in setBlobReferenceContext; the per-row path only appends the
  8-byte rowAddress.

Misc:
- Extract LargeBinaryWriter to its own file.
- Fold the new constructor arguments into single canonical constructors
  rather than parallel overloads; update affected tests.
- BaseBlobJoinTest enables the SQL extension so the rule path is covered.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
hamersaw and others added 2 commits May 21, 2026 11:29
Address PR review feedback on the blob join-preservation path:

- BlobReferenceResolver.resolveBatch: deduplicate row addresses within
  each (datasetUri, columnName) group and fan resolved bytes back out by
  address instead of by list position. Guard against takeBlobs count
  mismatch and null elements by failing loudly rather than writing wrong
  bytes. Document the takeBlobs ordering/1:1 contract.
- BlobStructAccessor.getBlobReference: test blob size with the primitive
  UInt8Vector accessor instead of boxing through a per-row BigInteger on
  the scan hot path.
- LargeBinaryWriter: buffer lazily — write rows directly until the first
  blob reference, then buffer only the tail. The common non-blob binary
  case now buffers nothing.

Tests (written as JUnit 5 in Scala so surefire actually executes them;
the existing ScalaTest *Suite classes are not picked up by the build):
- LargeBinaryWriterTest: direct/buffered ordering, reference resolution
  at correct indices, IOException->RuntimeException, reset.
- LanceBlobSourceContextRuleTest: non-Lance target no-op, blob-free
  source no-op, positive annotation, existing-key guard, idempotence.
- BaseBlobJoinTest: one-to-many JOIN test exercising resolver dedup.

Verified across Spark 3.5 and 4.1 (Scala 2.13).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Blob references flow through the shuffle as ~200-byte placeholders, so the
per-batch byte guard sized the references, not the blobs they resolve to. The
batch capped at 8192 rows; at finish() those references resolved to full blob
bytes all at once (resolved map + vector copy), reliably OOMing an executor at
the feature's target scale with maxBatchBytes providing no protection.

Carry the resolved blob size (already known from the source size vector at read
time) in BlobReference, and feed an exact buffered-bytes estimate into both the
semaphore and queued write buffers' byte budgets. A batch now flushes before its
resolved blobs exceed maxBatchBytes, bounding the Arrow vector; peak transient at
resolution stays ~2x maxBatchBytes.

- BlobReference: wire format v2 appends 8-byte size
- BlobStructAccessor: stamp real size onto each emitted reference
- LargeBinaryWriter: track pendingBytes, expose estimatedBufferedBytes
- LanceArrowWriter + field-writer base + container writers: propagate estimate
- Semaphore/Queued buffers: add estimate to per-batch byte total

Tests: size round-trip, estimatedBufferedBytes accounting/reset, and an
end-to-end buffer test proving references trip the byte budget.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@hamersaw

Copy link
Copy Markdown
Collaborator

@beinan pushed a few commits here that:

  • pass a "resolver" to the lance writer that contains a dataset URI with a set of storage options / etc that get passed to the writer. the LargeBlobWriter uses this to open the read dataset correctly (ie. respecting vendored credentials, etc)
  • buffering take operations on writes so we don't do a single call for each row. this respects the configuration for max buffer bytes on writes.
  • various other allocation removals / cleanups to help combat some of the many OOM issues you're running into

@beinan

beinan commented May 21, 2026

Copy link
Copy Markdown
Contributor Author

@hamersaw Thanks for the approval! Could you merge this when you get a chance?

@hamersaw hamersaw merged commit d3cfcea into lance-format:main May 21, 2026
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants